-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Prevent multiple device list updates from breaking a batch send #5156
Conversation
Hm, this is currently failing as:
is not valid SQL, failing with:
Searching tells me we need to use an . o ( Perhaps this was the reason we didn't have an |
Codecov Report
@@ Coverage Diff @@
## develop #5156 +/- ##
===========================================
+ Coverage 62.99% 63.08% +0.08%
===========================================
Files 341 341
Lines 35607 35624 +17
Branches 5827 5828 +1
===========================================
+ Hits 22432 22474 +42
+ Misses 11605 11584 -21
+ Partials 1570 1566 -4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you've ended up overcomplicating this; could you have another go?
Also: we try to minimise the work we do in a _txn
function, since you're hanging onto a database connection which could otherwise be doing useful work. Suggest returning the raw list and doing the deduping magic in the parent?
(this will probably require you to split _get_devices_by_remote_txn
in two, but afaict there is no need for both parts of it to be done on the same transaction, so that should be fine)
synapse/storage/devices.py
Outdated
# being that such a large device list update is likely an error. | ||
# | ||
# Note: The code below assumes this value is at least 1 | ||
maximum_devices = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be passed in as a limit
param, so that it can be derived from https://github.com/matrix-org/synapse/blob/develop/synapse/federation/sender/per_destination_queue.py#L37.
synapse/storage/devices.py
Outdated
# maps (user_id, device_id) -> stream_id | ||
query_map = {(r[0], r[1]): r[2] for r in txn} | ||
if not query_map: | ||
duplicate_updates = [r for r in txn] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate_updates = [r for r in txn] | |
duplicate_updates = list(txn) |
synapse/storage/devices.py
Outdated
update = duplicate_updates[i] | ||
prev_update = duplicate_updates[i - 1] | ||
|
||
if (update[0], update[1]) == (prev_update[0], prev_update[1]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite following what's going on here, but I think you're assuming that duplicates can only occur on adjacent rows, which is not the case.
I suggest you just build a dict which maps from (user_id, device_id)
to stream_id
, and then you can iterate over the results and check the dict for each row.
* develop: Revert 085ae34 Add a DUMMY stage to captcha-only registration flow Make Prometheus snippet less confusing on the metrics collection doc (#4288) Set syslog identifiers in systemd units (#5023) Run Black on the tests again (#5170) Add AllowEncodedSlashes to apache (#5068) remove instructions for jessie installation (#5164) Run `black` on per_destination_queue Limit the number of EDUs in transactions to 100 as expected by receiver (#5138)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getting there, but I think it could do with a bit of cleaning up to be clear and elegant. I've made a few suggestions, but please don't feel constrained by them: have a look at the code yourself and ask yourself if there are things that could be simplified.
[Incidentally, some of this stuff is getting a bit gnarly. Some UTs for get_devices_by_remote
wouldn't go amiss.]
@@ -351,7 +351,7 @@ def _get_new_device_messages(self, limit): | |||
last_device_list = self._last_device_list_stream_id | |||
# Will return at most 20 entries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look right any more
@@ -351,7 +351,7 @@ def _get_new_device_messages(self, limit): | |||
last_device_list = self._last_device_list_stream_id | |||
# Will return at most 20 entries | |||
now_stream_id, results = yield self._store.get_devices_by_remote( | |||
self._destination, last_device_list | |||
self._destination, last_device_list, limit=limit - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why - 1?
synapse/storage/devices.py
Outdated
"""Get stream of updates to send to remote servers | ||
|
||
Returns: | ||
(int, list[dict]): current stream id and list of updates | ||
""" | ||
if limit < 1: | ||
raise StoreError("Device limit must be at least 1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RuntimeError is probably more appropriate here. It's not really a failure at the storage layer.
synapse/storage/devices.py
Outdated
|
||
def _get_max_stream_id_for_devices_txn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really getting the max_stream_id. Indeed it's now just returning the now_stream_id
which gets passed in, so we could simplify that and just return results
.
What it's really doing is building the device EDUs, including the e2e keys. So it could do with a better name too.
synapse/storage/devices.py
Outdated
""" | ||
txn.execute(sql, (destination, from_stream_id, now_stream_id, False)) | ||
txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit + 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment there's a bit of a funny split where the fancy logic for clipping the list is in the parent, but we've got a +1
and the long comment in this function. I think it would be more intuitive to make _get_devices_by_remote_txn
dumb, and move the +1
and the comment to the parent too.
synapse/storage/devices.py
Outdated
|
||
# If we ended up not being left over with any device updates to send | ||
# out, then skip this stream_id | ||
if len(query_map) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not query_map
synapse/storage/devices.py
Outdated
query_map[key] = update[2] | ||
|
||
# If we ended up not being left over with any device updates to send | ||
# out, then skip this stream_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment could do with an explanation as to what it means if we ended up in this situation.
synapse/storage/devices.py
Outdated
continue | ||
|
||
key = (update[0], update[1]) | ||
if key in query_map and query_map[key] >= update[2]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be inclined to write this:
query_map[key] = max(query_map.get(key, 0), update[2])
synapse/storage/devices.py
Outdated
# out, then skip this stream_id | ||
if len(query_map) == 0: | ||
defer.returnValue((now_stream_id + 1, [])) | ||
elif len(query_map) >= limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query_map is deduplicated, so it might have fewer than limit
entries even if we hit the limit. As above, I think you can combine the clip of now_stream_id with this.
synapse/storage/devices.py
Outdated
@@ -72,7 +72,8 @@ def get_devices_by_user(self, user_id): | |||
|
|||
defer.returnValue({d["device_id"]: d for d in devices}) | |||
|
|||
def get_devices_by_remote(self, destination, from_stream_id): | |||
@defer.inlineCallbacks | |||
def get_devices_by_remote(self, destination, from_stream_id, limit=100): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default for limit
is redundant, and probably just confusing?
synapse/storage/devices.py
Outdated
|
||
stream_id_cutoff = now_stream_id + 1 | ||
|
||
# Check if the last and second-to-last row's stream_id's are the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Check if the last and second-to-last row's stream_id's are the same | |
# Check if the last and second-to-last rows' stream_ids are the same |
synapse/storage/devices.py
Outdated
if ( | ||
len(updates) > 1 and | ||
len(updates) > limit and | ||
updates[-1][2] == updates[-2][2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this condition is redundant. We may as well set the stream_id_cutoff
to the stream_id of the last row whenever we exceed the limit. Note that this also makes the len(updates) > 1
condition redundant.
synapse/storage/devices.py
Outdated
# thus we're just going to assume it was a client-side error and not | ||
# send them. We return an empty list of updates instead. | ||
if not query_map: | ||
defer.returnValue((now_stream_id + 1, [])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that the token we return here is used as the from_token
next time, which is exclusive, so this should be now_stream_id
rather than now_stream_id + 1
.
Also: there is no need to skip everything between the stream_id_cutoff
and now_stream_id
: we should return stream_id_cutoff - 1
instead.
synapse/storage/devices.py
Outdated
key = (update[0], update[1]) | ||
query_map[key] = max(query_map.get(key, 0), update[2]) | ||
|
||
# If we ended up not being left over with any device updates to send |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be phrased more clearly.
If we didn't find any updates with a stream_id lower than the cutoff, it means that there are more than
limit
updates all of which have the same steam_id.That should only happen if a client is spamming the server with new devices, in which case E2E isn't going to work well anyway. We'll just skip that stream_id and return an empty list, and continue with the next stream_id next time.
synapse/storage/devices.py
Outdated
self._get_device_update_edus_by_remote_txn, | ||
destination, | ||
from_stream_id, | ||
now_stream_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we not want stream_id_cutoff - 1
here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, _get_device_update_edus_by_remote_txn
doesn't seem to use this param. let's kill it!
limit, | ||
) | ||
|
||
defer.returnValue((now_stream_id, results)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here.
maybe we can get rid of stream_id_cutoff
altogether, and just use now_stream_id
?
synapse/storage/devices.py
Outdated
): | ||
"""Return device update information for a given remote destination""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit of docstring on the params and results (here and _get_device_update_edus_by_remote_txn) wouldn't go amiss.
synapse/storage/devices.py
Outdated
|
||
def _get_device_update_edus_by_remote_txn( | ||
self, txn, destination, from_stream_id, now_stream_id, query_map, limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
limit
is unused as well.
synapse/storage/devices.py
Outdated
|
||
def _get_device_update_edus_by_remote_txn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be a txn
function? if you make it do runInteraction
itself, we can free up the db connection for the second half of the function.
Synapse 1.0.0rc1 (2019-06-07) ============================= Features -------- - Synapse now more efficiently collates room statistics. ([\#4338](#4338), [\#5260](#5260), [\#5324](#5324)) - Add experimental support for relations (aka reactions and edits). ([\#5220](#5220)) - Ability to configure default room version. ([\#5223](#5223), [\#5249](#5249)) - Allow configuring a range for the account validity startup job. ([\#5276](#5276)) - CAS login will now hit the r0 API, not the deprecated v1 one. ([\#5286](#5286)) - Validate federation server TLS certificates by default (implements [MSC1711](https://github.com/matrix-org/matrix-doc/blob/master/proposals/1711-x509-for-federation.md)). ([\#5359](#5359)) - Update /_matrix/client/versions to reference support for r0.5.0. ([\#5360](#5360)) - Add a script to generate new signing-key files. ([\#5361](#5361)) - Update upgrade and installation guides ahead of 1.0. ([\#5371](#5371)) - Replace the `perspectives` configuration section with `trusted_key_servers`, and make validating the signatures on responses optional (since TLS will do this job for us). ([\#5374](#5374)) - Add ability to perform password reset via email without trusting the identity server. ([\#5377](#5377)) - Set default room version to v4. ([\#5379](#5379)) Bugfixes -------- - Fixes client-server API not sending "m.heroes" to lazy-load /sync requests when a rooms name or its canonical alias are empty. Thanks to @dnaf for this work! ([\#5089](#5089)) - Prevent federation device list updates breaking when processing multiple updates at once. ([\#5156](#5156)) - Fix worker registration bug caused by ClientReaderSlavedStore being unable to see get_profileinfo. ([\#5200](#5200)) - Fix race when backfilling in rooms with worker mode. ([\#5221](#5221)) - Fix appservice timestamp massaging. ([\#5233](#5233)) - Ensure that server_keys fetched via a notary server are correctly signed. ([\#5251](#5251)) - Show the correct error when logging out and access token is missing. ([\#5256](#5256)) - Fix error code when there is an invalid parameter on /_matrix/client/r0/publicRooms ([\#5257](#5257)) - Fix error when downloading thumbnail with missing width/height parameter. ([\#5258](#5258)) - Fix schema update for account validity. ([\#5268](#5268)) - Fix bug where we leaked extremities when we soft failed events, leading to performance degradation. ([\#5274](#5274), [\#5278](#5278), [\#5291](#5291)) - Fix "db txn 'update_presence' from sentinel context" log messages. ([\#5275](#5275)) - Fix dropped logcontexts during high outbound traffic. ([\#5277](#5277)) - Fix a bug where it is not possible to get events in the federation format with the request `GET /_matrix/client/r0/rooms/{roomId}/messages`. ([\#5293](#5293)) - Fix performance problems with the rooms stats background update. ([\#5294](#5294)) - Fix noisy 'no key for server' logs. ([\#5300](#5300)) - Fix bug where a notary server would sometimes forget old keys. ([\#5307](#5307)) - Prevent users from setting huge displaynames and avatar URLs. ([\#5309](#5309)) - Fix handling of failures when processing incoming events where calling `/event_auth` on remote server fails. ([\#5317](#5317)) - Ensure that we have an up-to-date copy of the signing key when validating incoming federation requests. ([\#5321](#5321)) - Fix various problems which made the signing-key notary server time out for some requests. ([\#5333](#5333)) - Fix bug which would make certain operations (such as room joins) block for 20 minutes while attemoting to fetch verification keys. ([\#5334](#5334)) - Fix a bug where we could rapidly mark a server as unreachable even though it was only down for a few minutes. ([\#5335](#5335), [\#5340](#5340)) - Fix a bug where account validity renewal emails could only be sent when email notifs were enabled. ([\#5341](#5341)) - Fix failure when fetching batches of events during backfill, etc. ([\#5342](#5342)) - Add a new room version where the timestamps on events are checked against the validity periods on signing keys. ([\#5348](#5348), [\#5354](#5354)) - Fix room stats and presence background updates to correctly handle missing events. ([\#5352](#5352)) - Include left members in room summaries' heroes. ([\#5355](#5355)) - Fix `federation_custom_ca_list` configuration option. ([\#5362](#5362)) - Fix missing logcontext warnings on shutdown. ([\#5369](#5369)) Improved Documentation ---------------------- - Fix docs on resetting the user directory. ([\#5282](#5282)) - Fix notes about ACME in the MSC1711 faq. ([\#5357](#5357)) Internal Changes ---------------- - Synapse will now serve the experimental "room complexity" API endpoint. ([\#5216](#5216)) - The base classes for the v1 and v2_alpha REST APIs have been unified. ([\#5226](#5226), [\#5328](#5328)) - Simplifications and comments in do_auth. ([\#5227](#5227)) - Remove urllib3 pin as requests 2.22.0 has been released supporting urllib3 1.25.2. ([\#5230](#5230)) - Preparatory work for key-validity features. ([\#5232](#5232), [\#5234](#5234), [\#5235](#5235), [\#5236](#5236), [\#5237](#5237), [\#5244](#5244), [\#5250](#5250), [\#5296](#5296), [\#5299](#5299), [\#5343](#5343), [\#5347](#5347), [\#5356](#5356)) - Specify the type of reCAPTCHA key to use. ([\#5283](#5283)) - Improve sample config for monthly active user blocking. ([\#5284](#5284)) - Remove spurious debug from MatrixFederationHttpClient.get_json. ([\#5287](#5287)) - Improve logging for logcontext leaks. ([\#5288](#5288)) - Clarify that the admin change password API logs the user out. ([\#5303](#5303)) - New installs will now use the v54 full schema, rather than the full schema v14 and applying incremental updates to v54. ([\#5320](#5320)) - Improve docstrings on MatrixFederationClient. ([\#5332](#5332)) - Clean up FederationClient.get_events for clarity. ([\#5344](#5344)) - Various improvements to debug logging. ([\#5353](#5353)) - Don't run CI build checks until sample config check has passed. ([\#5370](#5370)) - Automatically retry buildkite builds (max twice) when an agent is lost. ([\#5380](#5380))
Fixes #5153 and towards fixing #5095.
This PR:
stream_id
stream_id
matches that of the update before it. If so, an update is running over the bounds of the batch, and we try to send everything but updates with that device ID.cc @richvdh